﻿using RabbitMQ.Client;
using RabbitMQ.Client.Events;

namespace Away.EventBusCore.DomainEvents.Impl;

public class RemoteDomainEventPublisher : IDomainEventPublisher, IDomainEventHost
{
    private IConnection? _conn;
    private readonly IServiceScope _scope;
    public RemoteDomainEventPublisher(IServiceProvider serviceProvider)
    {
        _scope = serviceProvider.CreateScope();
        using var channel = Connection.CreateModel();
        channel.ExchangeDeclare(
            exchange: Options.ExchangeName,
            type: ExchangeType.Direct,
            durable: true,
            autoDelete: false);
    }

    private IServiceProvider ServiceProvider => _scope.ServiceProvider;
    private ILogger<RemoteDomainEventPublisher> Logger => ServiceProvider.GetRequiredService<ILogger<RemoteDomainEventPublisher>>();
    private IMediator Mediator => ServiceProvider.GetRequiredService<IMediator>();
    private DomainEventOptionsBuilder Options => ServiceProvider.GetRequiredService<IOptionsMonitor<DomainEventOptionsBuilder>>().CurrentValue;

    private IConnection Connection
    {
        get
        {
            if (_conn != null && _conn.IsOpen)
            {
                return _conn;
            }
            ConnectionFactory factory = new()
            {
                Uri = new Uri(Options.ConnectionString!)
            };
            _conn = factory.CreateConnection();
            return _conn;
        }
    }

    public Task Publish<TEvent>(TEvent e) where TEvent : DomainEvent
    {
        return Publish(e.EventId, e);
    }

    public Task Publish(string eventId, object e)
    {
        var payload = JsonSerializer.Serialize(e);
        var body = Encoding.Default.GetBytes(payload);
        Logger.LogTrace("\r\n\r\npublish\r\nchannel:{}\r\npayload:{}\r\n\r\n", eventId, payload);

        using var channel = Connection.CreateModel();
        var properties = channel.CreateBasicProperties();
        properties.Persistent = true;

        channel.BasicPublish(
            exchange: Options.ExchangeName,
            routingKey: eventId,
            mandatory: true,
            basicProperties: properties,
            body: body);
        return Task.CompletedTask;
    }

    public Task Listen()
    {
        Logger.LogInformation("DomainEvnet 远程模式");
        foreach (var (eventId, _) in Options.EventBusTypes)
        {
            Linsten(eventId);
        }
        return Task.CompletedTask;
    }

    private Task Linsten(string eventId)
    {
        var exchangeName = Options.ExchangeName;
        var channel = Connection.CreateModel();
        channel.ExchangeBind(exchangeName, exchangeName, routingKey: string.Empty);
        channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false);
        channel.QueueDeclare(queue: eventId, durable: true, exclusive: false, autoDelete: false);
        channel.QueueBind(queue: eventId, exchange: exchangeName, routingKey: eventId);

        var consumer = new EventingBasicConsumer(channel);
        consumer.Received += async (model, args) =>
        {
            var eventId = args.RoutingKey;
            var payload = Encoding.Default.GetString(args.Body.ToArray());
            Logger.LogTrace("\r\n\r\nlisten\r\neventId:{}\r\npayload:{}\r\n\r\n", eventId, payload);
            if (string.IsNullOrWhiteSpace(payload))
            {
                channel.BasicReject(args.DeliveryTag, false);
                Logger.LogWarning("payload is null:{}", eventId);
                return;
            }
            Options.EventBusTypes.TryGetValue(eventId, out var type);
            if (type == null)
            {
                Logger.LogWarning("eventId is not Subscribe:{}", eventId);
                return;
            }
            var e = JsonSerializer.Deserialize(payload, type);
            await Mediator.Publish(e!);
            //channel.BasicAck(args.DeliveryTag, multiple: false);
        };
        channel.BasicConsume(queue: eventId, autoAck: true, consumer: consumer);
        return Task.CompletedTask;
    }

    public void Dispose()
    {
        _conn?.Dispose();
        _scope.Dispose();
    }
}
